package kafka_go

import (
	"errors"
	sarama "gitee.com/tym_hmm/go-kafa-shopify-sarama"
	"gitee.com/tym_hmm/kafka-go/Stragey"
	"sync"
	"sync/atomic"
)

var strategyFactory = Stragey.NewStrategyFactory()

/**
生产者客户端容器
*/
type containerProduct struct {
	build              BuildProductApi
	client             []sarama.SyncProducer
	clientCurrentIndex int32
	rLock              sync.RWMutex
}

func newContainerProduct(build BuildProductApi) *containerProduct {
	return &containerProduct{
		build:              build,
		clientCurrentIndex: 0,
	}
}

/**
获取连接列表
*/
func (this *containerProduct) getClientList() []sarama.SyncProducer {
	return this.client
}

/**
获取连接
*/
func (this *containerProduct) GetClient() (int32, sarama.SyncProducer) {
	this.rLock.Lock()
	defer this.rLock.Unlock()
	currentIndex := this.clientCurrentIndex
	clientLen := int32(len(this.client))
	var strategyIndex int32
	connStrategy := this.build.GetConnStrategy()
	switch connStrategy {
	case PRODUCT_CONN_STRATEGY_BALANCE_ROUNDROBIN:
		strategyIndex = strategyFactory.RoundRobin(currentIndex, clientLen)
	default:
		strategyIndex = strategyFactory.RoundRobin(currentIndex, clientLen)
	}
	loadIndex := strategyIndex - currentIndex

	atomic.AddInt32(&this.clientCurrentIndex, loadIndex)
	return this.clientCurrentIndex, this.client[this.clientCurrentIndex]
}

/**
添加连接
*/
func (this *containerProduct) addClient(client sarama.SyncProducer) error {
	if client == nil {
		return errors.New("product can not be empty")
	}
	this.rLock.Lock()
	defer this.rLock.Unlock()
	if this.client == nil {
		this.client = []sarama.SyncProducer{}
	}
	this.client = append(this.client, client)
	return nil
}
